/*
 * Copyright Debezium Authors.
 *
 * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
 */
package io.debezium.connector.mongodb.transforms;

import static io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.REWRITE_TOMBSTONE_DELETES_WITH_ID;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.Document;
import org.bson.RawBsonDocument;
import org.bson.types.ObjectId;
import org.junit.Test;

import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.ChangeStreamPreAndPostImagesOptions;
import com.mongodb.client.model.CreateCollectionOptions;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.Module;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.transforms.ExtractNewRecordStateConfigDefinition;
import io.debezium.transforms.extractnewstate.DefaultDeleteHandlingStrategy;
import io.debezium.util.Collect;

/**
 * Integration test for {@link ExtractNewDocumentState} and {@link DefaultDeleteHandlingStrategy}. It sends operations into
 * MongoDB and listens on messages that are generated by Debezium plug-in. The messages
 * are then run through the SMT itself.
 *
 * @author Harvey Yue
 */
public class ExtractNewDocumentStateTestIT extends AbstractExtractNewDocumentStateTestIT {

    @Override
    protected String getCollectionName() {
        return "functional";
    }

    @Test
    @FixFor("DBZ-563")
    public void shouldDropTombstoneByDefault() throws InterruptedException {
        transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "drop"));
        // First insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .insertOne(Document.parse("{'_id': 1, 'dataStr': 'hello', 'dataInt': 123, 'dataLong': 80000000000}"));
        }

        SourceRecords records = consumeRecordsByTopic(1);

        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);

        // Test Delete
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).deleteOne(RawBsonDocument.parse("{'_id' : 1}"));
        }

        // First delete record to arrive is coming from the oplog
        SourceRecord firstRecord = getRecordByOperation(Envelope.Operation.DELETE);
        final SourceRecord transformedDelete = transformation.apply(firstRecord);
        assertThat(transformedDelete).isNull();

        // Second record is the tombstone
        SourceRecord tombstoneRecord = getNextRecord();
        assertThat(tombstoneRecord).isNotNull();

        // Test tombstone record is dropped
        final SourceRecord transformedTombstone = transformation.apply(tombstoneRecord);
        assertThat(transformedTombstone).isNull();
    }

    @Test
    public void shouldTransformEvents() throws InterruptedException, IOException {
        final Map<String, String> transformationConfig = new HashMap<>();
        // transformationConfig.put(CONFIG_DROP_TOMBSTONES, "false");
        transformationConfig.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(transformationConfig);

        // Test insert
        try (var client = connect()) {
            long timestamp = ZonedDateTime.of(2020, 1, 28, 10, 0, 33, 0, ZoneId.of("UTC")).toEpochSecond();
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .insertOne(Document.parse(
                            "{"
                                    + "  '_id': 1, "
                                    + "  'dataStr': 'hello', "
                                    + "  'dataInt': 123, "
                                    + "  'dataLong': 80000000000, "
                                    + "  'dataDate': ISODate(\"2020-01-27T10:47:12.311Z\"), "
                                    + "  'dataTimestamp': Timestamp(" + timestamp + ", 1)" // seconds since epoch, operation counter within second
                                    + "}"));
        }

        SourceRecords records = consumeRecordsByTopic(1);

        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        final SourceRecord insertRecord = records.recordsForTopic(this.topicName()).get(0);
        final SourceRecord transformedInsert = transformation.apply(insertRecord);
        final Struct transformedInsertValue = (Struct) transformedInsert.value();

        assertThat(transformedInsert.valueSchema().field("_id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        assertThat(transformedInsert.valueSchema().field("dataStr").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        assertThat(transformedInsert.valueSchema().field("dataInt").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        assertThat(transformedInsert.valueSchema().field("dataLong").schema()).isEqualTo(Schema.OPTIONAL_INT64_SCHEMA);
        assertThat(transformedInsertValue.get("_id")).isEqualTo(1);
        assertThat(transformedInsertValue.get("dataStr")).isEqualTo("hello");
        assertThat(transformedInsertValue.get("dataInt")).isEqualTo(123);
        assertThat(transformedInsertValue.get("dataLong")).isEqualTo(80_000_000_000l);
        assertThat(transformedInsertValue.get("dataDate")).isEqualTo(Date.from(Instant.from(ZonedDateTime.of(2020, 1, 27, 10, 47, 12, 311000000, ZoneId.of("UTC")))));
        assertThat(transformedInsertValue.get("dataTimestamp")).isEqualTo(Date.from(Instant.from(ZonedDateTime.of(2020, 1, 28, 10, 0, 33, 0, ZoneId.of("UTC")))));

        // Test update
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).updateOne(RawBsonDocument.parse("{'_id' : 1}"),
                    RawBsonDocument.parse("{'$set': {'dataStr': 'bye'}}"));
        }

        records = consumeRecordsByTopic(1);
        final SourceRecord candidateUpdateRecord = records.recordsForTopic(this.topicName()).get(0);
        if (((Struct) candidateUpdateRecord.value()).get("op").equals("c")) {
            // MongoDB is not providing really consistent snapshot, so the initial insert
            // can arrive both in initial sync snapshot and in oplog
            records = consumeRecordsByTopic(1);
        }

        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        final SourceRecord updateRecord = records.recordsForTopic(this.topicName()).get(0);
        final SourceRecord transformedUpdate = transformation.apply(updateRecord);
        final Struct transformedUpdateValue = (Struct) transformedUpdate.value();

        assertThat(transformedUpdate.valueSchema().field("_id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        assertThat(transformedUpdate.valueSchema().field("dataStr").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        assertThat(transformedUpdateValue.get("_id")).isEqualTo(1);
        assertThat(transformedUpdateValue.get("dataStr")).isEqualTo("bye");

        // Test Update Multiple Fields
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).updateOne(RawBsonDocument.parse("{'_id' : 1}"),
                    RawBsonDocument.parse("{'$set': {'newStr': 'hello', 'dataInt': 456}}"));
        }

        records = consumeRecordsByTopic(1);

        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        final SourceRecord updateMultipleRecord = records.recordsForTopic(this.topicName()).get(0);
        final SourceRecord transformedMultipleUpdate = transformation.apply(updateMultipleRecord);
        final Struct transformedMultipleUpdateValue = (Struct) transformedMultipleUpdate.value();

        assertThat(transformedMultipleUpdate.valueSchema().field("_id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        assertThat(transformedMultipleUpdate.valueSchema().field("newStr").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        assertThat(transformedMultipleUpdate.valueSchema().field("dataInt").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        assertThat(transformedMultipleUpdateValue.get("_id")).isEqualTo(1);
        assertThat(transformedMultipleUpdateValue.get("newStr")).isEqualTo("hello");
        assertThat(transformedMultipleUpdateValue.get("dataInt")).isEqualTo(456);

        // Test Update with $unset operation
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).updateOne(RawBsonDocument.parse("{'_id' : 1}"),
                    RawBsonDocument.parse("{'$unset': {'newStr': ''}}"));
        }

        records = consumeRecordsByTopic(1);

        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        final SourceRecord updateUnsetRecord = records.recordsForTopic(this.topicName()).get(0);
        final SourceRecord transformedUnsetUpdate = transformation.apply(updateUnsetRecord);
        final Struct transformedUnsetUpdateValue = (Struct) transformedUnsetUpdate.value();

        assertThat(transformedUnsetUpdate.valueSchema().field("_id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        assertThat(transformedUnsetUpdateValue.get("_id")).isEqualTo(1);
        assertThat(transformedUnsetUpdateValue.schema().field("newStr")).isNull();

        // Test FullUpdate
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).updateOne(RawBsonDocument.parse("{'_id' : 1}"),
                    RawBsonDocument.parse("{'dataStr': 'Hi again'}"));
        }

        records = consumeRecordsByTopic(1);
        final SourceRecord candidateFullUpdateRecord = records.recordsForTopic(this.topicName()).get(0);
        if (((Struct) candidateFullUpdateRecord.value()).get("op").equals("c")) {
            // MongoDB is not providing really consistent snapshot, so the initial insert
            // can arrive both in initial sync snapshot and in oplog
            records = consumeRecordsByTopic(1);
        }

        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        final SourceRecord FullUpdateRecord = records.recordsForTopic(this.topicName()).get(0);
        final SourceRecord transformedFullUpdate = transformation.apply(FullUpdateRecord);
        final Struct transformedFullUpdateValue = (Struct) transformedFullUpdate.value();

        assertThat(transformedFullUpdate.valueSchema().field("_id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        assertThat(transformedFullUpdate.valueSchema().field("dataStr").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        assertThat(transformedFullUpdateValue.get("_id")).isEqualTo(1);
        assertThat(transformedFullUpdateValue.get("dataStr")).isEqualTo("Hi again");

        // Test Delete
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).deleteOne(RawBsonDocument.parse("{'_id' : 1}"));
        }

        records = consumeRecordsByTopic(2);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(2);

        // Test mongo Deletion operation
        final SourceRecord deleteRecord = records.recordsForTopic(this.topicName()).get(0);
        final SourceRecord transformedDelete = transformation.apply(deleteRecord);
        final Struct transformedDeleteValue = (Struct) transformedDelete.value();

        assertThat(transformedDeleteValue).isNull();

        // Test tombstone record
        final SourceRecord tombstoneRecord = records.recordsForTopic(this.topicName()).get(1);
        final SourceRecord transformedTombstone = transformation.apply(tombstoneRecord);

        // assertThat(transformedTombstone.value()).isNull();
        assertThat(transformedTombstone).isNull();

        // Assert deletion preserves key
        // assertThat(SchemaUtil.asString(transformedDelete.keySchema())).isEqualTo(SchemaUtil.asString(transformedTombstone.keySchema()));
        // assertThat(transformedDelete.key().toString()).isEqualTo(transformedTombstone.key().toString());
    }

    @Test
    @FixFor("DBZ-1767")
    public void shouldSupportDbRef() throws InterruptedException, IOException {
        final Map<String, String> transformationConfig = new HashMap<>();
        transformationConfig.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformationConfig.put("array.encoding", "array");
        transformationConfig.put("operation.header", "true");
        transformationConfig.put("field.name.adjustment.mode", "avro");
        transformation.configure(transformationConfig);

        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .insertOne(Document.parse("{ '_id' : 2, 'data' : { '$ref' : 'a2', '$id' : 4, '$db' : 'b2' } }"));
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);

        final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));
        validate(transformed);
        final Struct value = ((Struct) transformed.value()).getStruct("data");

        assertThat(value.getString("_ref")).isEqualTo("a2");
        assertThat(value.getInt32("_id")).isEqualTo(4);
        assertThat(value.getString("_db")).isEqualTo("b2");
    }

    @Test
    @FixFor("DBZ-2680")
    public void shouldSupportSubSanitizeFieldName() throws InterruptedException, IOException {
        final Map<String, String> transformationConfig = new HashMap<>();
        transformationConfig.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformationConfig.put("array.encoding", "array");
        transformationConfig.put("operation.header", "true");
        transformationConfig.put("field.name.adjustment.mode", "avro");
        transformation.configure(transformationConfig);
        final String doc = "{" +
                "  \"_id\": \"222\"," +
                "  \"metrics\": {" +
                "    \"metric::fct\": {" +
                "      \"min\": 0," +
                "      \"max\": 1," +
                "    }," +
                "  }" +
                "}";

        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .insertOne(Document.parse(doc));
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);

        final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));
        validate(transformed);
        final Struct metric = ((Struct) transformed.value()).getStruct("metrics").getStruct("metric__fct");
        assertThat(metric.getInt32("min")).isEqualTo(0);
        assertThat(metric.getInt32("max")).isEqualTo(1);
    }

    @Test
    @FixFor("DBZ-1442")
    public void shouldAddFields() throws InterruptedException {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        props.put(ADD_FIELDS, "ord , db");
        transformation.configure(props);

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .insertOne(Document.parse("{ '_id' : 3, 'name' : 'Tim' }"));
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // update
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .updateOne(RawBsonDocument.parse("{'_id' : 3}"),
                            RawBsonDocument.parse("{'$set': {'name': 'Sally'}}"));
        }

        records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // Extract values from SourceRecord
        final SourceRecord record = records.allRecordsInOrder().get(0);
        final Struct source = ((Struct) record.value()).getStruct(Envelope.FieldName.SOURCE);

        // Perform transformation
        final SourceRecord transformed = transformation.apply(record);
        validate(transformed);

        // assert source fields' values
        final Struct value = (Struct) transformed.value();
        assertThat(value.get("__ord")).isEqualTo(source.getInt32("ord"));
        assertThat(value.get("__db")).isEqualTo(source.getString("db"));
        assertThat(value.get("__db")).isEqualTo(DB_NAME);
    }

    @Test
    @FixFor("DBZ-1442")
    public void shouldAddFieldsForRewriteDeleteEvent() throws InterruptedException {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(ADD_FIELDS, "ord,db");
        props.put(HANDLE_TOMBSTONE_DELETES, "rewrite");
        transformation.configure(props);

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .insertOne(Document.parse("{ '_id' : 4, 'name' : 'Sally' }"));
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // delete
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .deleteOne(RawBsonDocument.parse("{ '_id' : 4 }"));
        }

        records = consumeRecordsByTopic(2);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
        assertNoRecordsToConsume();

        // Extract values from SourceRecord
        final SourceRecord record = records.allRecordsInOrder().get(0);
        final Struct source = ((Struct) record.value()).getStruct(Envelope.FieldName.SOURCE);

        // Perform transformation
        final SourceRecord transformed = transformation.apply(record);
        validate(transformed);

        // assert source fields' values
        final Struct value = (Struct) transformed.value();
        assertThat(value.get("__ord")).isEqualTo(source.getInt32("ord"));
        assertThat(value.get("__db")).isEqualTo(source.getString("db"));
        assertThat(value.get("__db")).isEqualTo(DB_NAME);
    }

    @Test
    @FixFor("DBZ-7695")
    public void shouldAddFieldsForRewriteDeleteEventWithId() throws InterruptedException {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(ADD_FIELDS, "ord,db,op");
        props.put(HANDLE_TOMBSTONE_DELETES, "rewrite");
        props.put(REWRITE_TOMBSTONE_DELETES_WITH_ID.name(), "true");
        transformation.configure(props);

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .insertOne(Document.parse("{ '_id' : 4, 'name' : 'Sally' }"));
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // delete
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .deleteOne(RawBsonDocument.parse("{ '_id' : 4 }"));
        }

        records = consumeRecordsByTopic(2);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
        assertNoRecordsToConsume();

        // Extract values from SourceRecord
        final SourceRecord record = records.allRecordsInOrder().get(0);
        final Struct source = ((Struct) record.value()).getStruct(Envelope.FieldName.SOURCE);

        // Perform transformation
        final SourceRecord transformed = transformation.apply(record);
        validate(transformed);

        // assert source fields' values
        final Struct value = (Struct) transformed.value();
        assertThat(value.get("__ord")).isEqualTo(source.getInt32("ord"));
        assertThat(value.get("__db")).isEqualTo(source.getString("db"));
        assertThat(value.get("__db")).isEqualTo(DB_NAME);
        assertThat(value.get("__deleted")).isEqualTo(true);
        assertThat(value.get("__op")).isEqualTo("d");
        assertThat(value.get("_id")).isEqualTo(4);
    }

    @Test
    public void shouldTransformRecordForInsertEvent() throws InterruptedException {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        props.put(ADD_HEADERS, "op");
        transformation.configure(props);

        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", objId)
                .append("name", "Sally")
                .append("phone", 123L)
                .append("active", true)
                .append("scores", Arrays.asList(1.2, 3.4, 5.6));

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // Extract values from SourceRecord
        final SourceRecord record = records.allRecordsInOrder().get(0);

        // Perform transformation
        final SourceRecord transformed = transformation.apply(record);
        validate(transformed);

        // then assert operation header is insert
        Iterator<Header> operationHeader = transformed.headers().allWithName(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY);
        assertThat((operationHeader).hasNext()).isTrue();
        assertThat(operationHeader.next().value().toString()).isEqualTo(Envelope.Operation.CREATE.code());

        // acquire key and value Structs
        Struct key = (Struct) transformed.key();
        Struct value = (Struct) transformed.value();

        // then assert key and its schema
        assertThat(key.schema()).isSameAs(transformed.keySchema());
        assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(key.get("id")).isEqualTo(objId.toString());

        // and then assert value and its schema
        assertThat(value.schema().name()).isEqualTo(SERVER_NAME + "." + DB_NAME + "." + getCollectionName());
        assertThat(value.schema()).isSameAs(transformed.valueSchema());
        assertThat(value.get("name")).isEqualTo("Sally");
        assertThat(value.get("_id")).isEqualTo(objId.toString());
        assertThat(value.get("phone")).isEqualTo(123L);
        assertThat(value.get("active")).isEqualTo(true);
        assertThat(value.get("scores")).isEqualTo(Arrays.asList(1.2, 3.4, 5.6));

        assertThat(value.schema().field("_id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().field("phone").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT64_SCHEMA);
        assertThat(value.schema().field("active").schema()).isEqualTo(SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
        assertThat(value.schema().field("scores").schema()).isEqualTo(SchemaBuilder.array(SchemaBuilder.OPTIONAL_FLOAT64_SCHEMA).optional().build());
        assertThat(value.schema().fields()).hasSize(5);
    }

    @Test
    public void shouldTransformRecordForInsertEventWithComplexIdType() throws InterruptedException {
        waitForStreamingRunning();
        transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "tombstone"));

        Document obj = new Document()
                .append("_id", new Document()
                        .append("company", 32)
                        .append("dept", "home improvement"))
                .append("name", "Sally");

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // Extract values from SourceRecord
        final SourceRecord record = records.allRecordsInOrder().get(0);

        // Perform transformation
        final SourceRecord transformed = transformation.apply(record);
        validate(transformed);

        Struct key = (Struct) transformed.key();
        Struct value = (Struct) transformed.value();

        // then assert key and its schema
        assertThat(key.schema()).isSameAs(transformed.keySchema());
        assertThat(key.schema().field("id").schema().field("company").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
        assertThat(key.schema().field("id").schema().field("dept").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(((Struct) key.get("id")).get("company")).isEqualTo(32);
        assertThat(((Struct) key.get("id")).get("dept")).isEqualTo("home improvement");

        // and then assert value and its schema
        assertThat(value.schema()).isSameAs(transformed.valueSchema());
        assertThat(((Struct) value.get("_id")).get("company")).isEqualTo(32);
        assertThat(((Struct) value.get("_id")).get("dept")).isEqualTo("home improvement");
        assertThat(value.get("name")).isEqualTo("Sally");

        assertThat(value.schema().field("_id").schema().field("company").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
        assertThat(value.schema().field("_id").schema().field("dept").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().fields()).hasSize(2);
    }

    @Test
    public void shouldGenerateRecordForUpdateEvent() throws InterruptedException {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        props.put(ADD_HEADERS, "op");
        transformation.configure(props);

        ObjectId objId = new ObjectId();
        Document obj = new Document()
                .append("_id", objId)
                .append("name", "Tim");

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        Document updateObj = new Document().append("$set", new Document("name", "Sally"));

        // update
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
        }

        records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // Extract values from SourceRecord
        final SourceRecord record = records.allRecordsInOrder().get(0);

        // Perform transformation
        final SourceRecord transformed = transformation.apply(record);
        validate(transformed);

        // then assert operation header is update
        Iterator<Header> operationHeader = transformed.headers().allWithName(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY);
        assertThat((operationHeader).hasNext()).isTrue();
        assertThat(operationHeader.next().value().toString()).isEqualTo(Envelope.Operation.UPDATE.code());

        // acquire key and value Structs
        Struct key = (Struct) transformed.key();
        Struct value = (Struct) transformed.value();

        // then assert key and its schema
        assertThat(key.schema()).isSameAs(transformed.keySchema());
        assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(key.get("id")).isEqualTo(objId.toString());

        // and then assert value and its schema
        assertThat(value.schema()).isSameAs(transformed.valueSchema());
        assertThat(value.get("name")).isEqualTo("Sally");
        assertThat(value.get("_id")).isEqualTo(objId.toString());

        assertThat(value.schema().field("_id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().fields()).hasSize(2);
    }

    @Test
    @SkipWhenDatabaseVersion(check = LESS_THAN, major = 6, reason = "Pre-image support in Change Stream is officially released in Mongo 6.0.")
    public void shouldGenerateRecordForPartialUpdateEvent() throws InterruptedException {
        Configuration config = getBaseConfigBuilder()
                .with(MongoDbConnectorConfig.CAPTURE_MODE, MongoDbConnectorConfig.CaptureMode.CHANGE_STREAMS_UPDATE_FULL_WITH_PRE_IMAGE)
                .build();
        restartConnectorWithConfig(config);
        waitForStreamingRunning();

        transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "tombstone"));

        ObjectId objId = new ObjectId();
        Document obj = new Document()
                .append("_id", objId)
                .append("name", "Tim")
                .append("phone", 123L)
                .append("active", false);

        // insert
        try (var client = connect()) {
            MongoDatabase db1 = client.getDatabase(DB_NAME);
            CreateCollectionOptions options = new CreateCollectionOptions();
            options.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
            db1.createCollection(this.getCollectionName(), options);

            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        Document updateObj = new Document()
                .append("$set", new Document("name", "Sally"))
                // the value of "$unset" doesn't matter, and they'll all be unset.
                // https://www.mongodb.com/docs/manual/reference/operator/update/unset/#mongodb-update-up.-unset
                .append("$unset", new Document().append("phone", true).append("active", false));

        // update
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
        }

        records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // Extract values from SourceRecord
        final SourceRecord record = records.allRecordsInOrder().get(0);

        // Perform transformation
        final SourceRecord transformed = transformation.apply(record);
        validate(transformed);

        Struct value = (Struct) transformed.value();

        // and then assert value and its schema
        assertThat(value.schema()).isSameAs(transformed.valueSchema());
        assertThat(value.get("name")).isEqualTo("Sally");
        assertThat(value.schema().field("phone")).isNull();
        assertThat(value.schema().field("active")).isNull();
        assertThat(value.schema().fields()).hasSize(2);
    }

    @Test
    public void shouldGenerateRecordForSetOnlyPartialUpdateEvent() throws InterruptedException {
        Configuration config = getBaseConfigBuilder()
                .with(MongoDbConnectorConfig.CAPTURE_MODE, MongoDbConnectorConfig.CaptureMode.CHANGE_STREAMS)
                .build();
        restartConnectorWithConfig(config);
        waitForStreamingRunning();

        transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "tombstone"));

        ObjectId objId = new ObjectId();
        Document obj = new Document()
                .append("_id", objId)
                .append("name", "Tim")
                .append("phone", 123L)
                .append("active", false);

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        Document updateObj = new Document()
                .append("$set", new Document("name", "Sally"));

        // update
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
        }

        records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // Extract values from SourceRecord
        final SourceRecord record = records.allRecordsInOrder().get(0);

        // Perform transformation
        final SourceRecord transformed = transformation.apply(record);
        validate(transformed);

        Struct value = (Struct) transformed.value();

        // and then assert value and its schema
        assertThat(value.schema()).isSameAs(transformed.valueSchema());
        assertThat(value.get("name")).isEqualTo("Sally");
        // no pre-image, so these 2 fields shouldn't be visible
        assertThat(value.schema().field("phone")).isNull();
        assertThat(value.schema().field("active")).isNull();
        assertThat(value.schema().fields()).hasSize(2);
    }

    @Test
    @FixFor("DBZ-612")
    public void shouldGenerateRecordForUpdateEventWithUnset() throws InterruptedException {
        waitForStreamingRunning();

        transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "tombstone"));

        ObjectId objId = new ObjectId();
        Document obj = new Document()
                .append("_id", objId)
                .append("name", "Tim")
                .append("phone", 123L)
                .append("active", false);

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        Document updateObj = new Document()
                .append("$set", new Document("name", "Sally"))
                .append("$unset", new Document().append("phone", true).append("active", false));

        // update
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
        }

        records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // Extract values from SourceRecord
        final SourceRecord record = records.allRecordsInOrder().get(0);

        // Perform transformation
        final SourceRecord transformed = transformation.apply(record);
        validate(transformed);

        Struct value = (Struct) transformed.value();

        // and then assert value and its schema
        assertThat(value.schema()).isSameAs(transformed.valueSchema());
        assertThat(value.get("name")).isEqualTo("Sally");
        assertThat(value.schema().field("phone")).isNull();
        assertThat(value.schema().fields()).hasSize(2);
    }

    @Test
    @FixFor("DBZ-612")
    public void shouldGenerateRecordForUnsetOnlyUpdateEvent() throws InterruptedException {
        waitForStreamingRunning();

        transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "tombstone"));

        ObjectId objId = new ObjectId();
        Document obj = new Document()
                .append("_id", objId)
                .append("name", "Sally")
                .append("phone", 123L)
                .append("active", false);

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        Document updateObj = new Document()
                .append("$unset", new Document().append("phone", true).append("active", false));

        // update
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
        }

        records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // Extract values from SourceRecord
        final SourceRecord record = records.allRecordsInOrder().get(0);

        // Perform transformation
        final SourceRecord transformed = transformation.apply(record);
        validate(transformed);

        Struct value = (Struct) transformed.value();

        // and then assert value and its schema
        assertThat(value.schema()).isSameAs(transformed.valueSchema());
        assertThat(value.schema().field("phone")).isNull();
        assertThat(value.schema().fields()).hasSize(2);
    }

    @Test
    @FixFor("DBZ-582")
    public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws InterruptedException {
        // todo: we might want to rework how the base class handles connector bootstrap to avoid needing to
        // stop the connector and restart it with the appropriate configuration here
        restartConnectorWithoutEmittingTombstones();
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(props);

        ObjectId objId = new ObjectId();
        Document obj = new Document()
                .append("_id", objId);

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // delete
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .deleteOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"));
        }

        records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // Extract values from SourceRecord
        final SourceRecord record = records.allRecordsInOrder().get(0);

        // Perform transformation
        final SourceRecord transformed = transformation.apply(record);
        validate(transformed);

        Struct key = (Struct) transformed.key();
        Struct value = (Struct) transformed.value();

        // then assert key and its schema
        assertThat(key.schema()).isSameAs(transformed.keySchema());
        assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(key.get("id")).isEqualTo(objId.toString());

        assertThat(value).isNull();
    }

    @Test
    @FixFor("DBZ-1032")
    public void shouldGenerateRecordHeaderForTombstone() throws InterruptedException {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(ADD_HEADERS, "op");
        // props.put(DROP_TOMBSTONE, "false");
        props.put(HANDLE_TOMBSTONE_DELETES, "rewrite-with-tombstone");
        transformation.configure(props);

        ObjectId objId = new ObjectId();
        Document obj = new Document()
                .append("_id", objId);

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // delete
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .deleteOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"));
        }

        records = consumeRecordsByTopic(2);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
        assertNoRecordsToConsume();

        // Perform transformation
        final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(1));
        validate(transformed);

        Struct key = (Struct) transformed.key();
        Struct value = (Struct) transformed.value();

        // then assert key and its schema
        assertThat(key.schema()).isSameAs(transformed.keySchema());
        assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(key.get("id")).isEqualTo(objId.toString());

        // then assert operation header is delete
        Iterator<Header> operationHeader = transformed.headers().allWithName(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY);
        assertThat((operationHeader).hasNext()).isTrue();
        assertThat(operationHeader.next().value().toString()).isEqualTo(Envelope.Operation.DELETE.code());

        assertThat(value).isNull();
    }

    @Test
    @FixFor("DBZ-583")
    public void shouldDropDeleteMessagesByDefault() throws InterruptedException {
        restartConnectorWithoutEmittingTombstones();
        waitForStreamingRunning();

        transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "drop"));

        ObjectId objId = new ObjectId();
        Document obj = new Document()
                .append("_id", objId);

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // delete
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .deleteOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"));
        }

        records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // Perform transformation
        final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));

        // then assert transformed message is skipped
        assertThat(transformed).isNull();
    }

    @Test
    @FixFor("DBZ-583")
    public void shouldRewriteDeleteMessage() throws InterruptedException {
        restartConnectorWithoutEmittingTombstones();
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(HANDLE_TOMBSTONE_DELETES, "rewrite");
        transformation.configure(props);

        ObjectId objId = new ObjectId();
        Document obj = new Document()
                .append("_id", objId);

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // delete
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .deleteOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"));
        }

        records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // Perform transformation
        final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));

        Struct key = (Struct) transformed.key();
        Struct value = (Struct) transformed.value();

        // then assert key and its schema
        assertThat(key.schema()).isSameAs(transformed.keySchema());
        assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(key.get("id")).isEqualTo(objId.toString());

        assertThat(value.schema().field("__deleted").schema()).isEqualTo(SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
        assertThat(value.get("__deleted")).isEqualTo(true);
    }

    @Test
    @FixFor("DBZ-583")
    public void shouldRewriteMessagesWhichAreNotDeletes() throws InterruptedException {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(HANDLE_TOMBSTONE_DELETES, "rewrite");
        transformation.configure(props);

        ObjectId objId = new ObjectId();
        Document obj = new Document()
                .append("_id", objId)
                .append("name", "Tim");

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        Document updateObj = new Document().append("$set", new Document("name", "Sally"));

        // update
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
        }

        records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // Perform transformation
        final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));

        Struct value = (Struct) transformed.value();

        // then assert value and its schema
        assertThat(value.schema().field("__deleted").schema()).isEqualTo(SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
        assertThat(value.get("__deleted")).isEqualTo(false);
    }

    @Test
    public void shouldGenerateRecordForDeleteEvent() throws InterruptedException {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(ADD_HEADERS, "op");
        props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(props);

        ObjectId objId = new ObjectId();
        Document obj = new Document()
                .append("_id", objId);

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // delete
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .deleteOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"));
        }

        records = consumeRecordsByTopic(2);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
        assertNoRecordsToConsume();

        // Perform transformation
        final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));

        // then assert operation header is delete
        Iterator<Header> operationHeader = transformed.headers().allWithName(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY);
        assertThat((operationHeader).hasNext()).isTrue();
        assertThat(operationHeader.next().value().toString()).isEqualTo(Envelope.Operation.DELETE.code());

        // acquire key and value Structs
        Struct key = (Struct) transformed.key();
        Struct value = (Struct) transformed.value();

        // then assert key and its schema
        assertThat(key.schema()).isSameAs(transformed.keySchema());
        assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(key.get("id")).isEqualTo(objId.toString());

        assertThat(value).isNull();
    }

    @Test
    @FixFor("DBZ-971")
    public void shouldPropagatePreviousRecordHeaders() throws InterruptedException {
        waitForStreamingRunning();

        transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "drop"));

        ObjectId objId = new ObjectId();
        Document obj = new Document()
                .append("_id", objId)
                .append("name", "Tim");

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        Document updateObj = new Document().append("$set", new Document("name", "Sally"));

        // update
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
        }

        records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        final SourceRecord record = records.allRecordsInOrder().get(0);
        record.headers().addString("application/debezium-test-header", "shouldPropagatePreviousRecordHeaders");

        // Perform transformation
        final SourceRecord transformed = transformation.apply(record);

        assertThat(transformed.headers()).hasSize(1);
        Iterator<Header> headers = transformed.headers().allWithName("application/debezium-test-header");
        assertThat(headers.hasNext()).isTrue();
        assertThat(headers.next().value().toString()).isEqualTo("shouldPropagatePreviousRecordHeaders");
    }

    @Test
    public void shouldNotFlattenTransformRecordForInsertEvent() throws InterruptedException {
        waitForStreamingRunning();

        transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "tombstone"));

        ObjectId objId = new ObjectId();
        Document obj = new Document()
                .append("_id", objId)
                .append("name", "Sally")
                .append("address", new Document()
                        .append("street", "Morris Park Ave")
                        .append("zipcode", "10462"));

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // Perform transformation
        final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));

        Struct key = (Struct) transformed.key();
        Struct value = (Struct) transformed.value();

        // then assert key and its schema
        assertThat(key.schema()).isSameAs(transformed.keySchema());
        assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(key.get("id")).isEqualTo(objId.toString());

        // and then assert value and its schema
        assertThat(value.schema()).isSameAs(transformed.valueSchema());
        assertThat(value.get("name")).isEqualTo("Sally");
        assertThat(value.get("_id")).isEqualTo(objId.toString());
        assertThat(value.get("address")).isEqualTo(new Struct(value.schema().field("address").schema())
                .put("street", "Morris Park Ave").put("zipcode", "10462"));

        assertThat(value.schema().field("_id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().field("address").schema()).isEqualTo(
                SchemaBuilder.struct()
                        .name(SERVER_NAME + "." + DB_NAME + "." + getCollectionName() + ".address")
                        .optional()
                        .field("street", Schema.OPTIONAL_STRING_SCHEMA)
                        .field("zipcode", Schema.OPTIONAL_STRING_SCHEMA)
                        .build());
        assertThat(value.schema().fields()).hasSize(3);
    }

    @Test
    public void shouldFlattenTransformRecordForInsertEvent() throws InterruptedException {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(FLATTEN_STRUCT, "true");
        props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(props);

        ObjectId objId = new ObjectId();
        Document obj = new Document()
                .append("_id", objId)
                .append("name", "Sally")
                .append("address", new Document()
                        .append("street", "Morris Park Ave")
                        .append("zipcode", "10462"));

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // Perform transformation
        final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));

        Struct key = (Struct) transformed.key();
        Struct value = (Struct) transformed.value();

        // then assert key and its schema
        assertThat(key.schema()).isSameAs(transformed.keySchema());
        assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(key.get("id")).isEqualTo(objId.toString());

        // and then assert value and its schema
        assertThat(value.schema()).isSameAs(transformed.valueSchema());
        assertThat(value.get("name")).isEqualTo("Sally");
        assertThat(value.get("_id")).isEqualTo(objId.toString());
        assertThat(value.get("address_street")).isEqualTo("Morris Park Ave");
        assertThat(value.get("address_zipcode")).isEqualTo("10462");

        assertThat(value.schema().field("_id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().field("address_street").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().field("address_zipcode").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().fields()).hasSize(4);
    }

    @Test
    public void shouldFlattenWithDelimiterTransformRecordForInsertEvent() throws InterruptedException {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(FLATTEN_STRUCT, "true");
        props.put(DELIMITER, "-");
        props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(props);

        ObjectId objId = new ObjectId();
        Document obj = new Document()
                .append("_id", objId)
                .append("name", "Sally")
                .append("address", new Document()
                        .append("street", "Morris Park Ave")
                        .append("zipcode", "10462"));

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // Perform transformation
        final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));

        Struct key = (Struct) transformed.key();
        Struct value = (Struct) transformed.value();

        // then assert key and its schema
        assertThat(key.schema()).isSameAs(transformed.keySchema());
        assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(key.get("id")).isEqualTo(objId.toString());

        // and then assert value and its schema
        assertThat(value.schema()).isSameAs(transformed.valueSchema());
        assertThat(value.get("name")).isEqualTo("Sally");
        assertThat(value.get("_id")).isEqualTo(objId.toString());
        assertThat(value.get("address-street")).isEqualTo("Morris Park Ave");
        assertThat(value.get("address-zipcode")).isEqualTo("10462");

        assertThat(value.schema().field("_id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().field("address-street").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().field("address-zipcode").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().fields()).hasSize(4);
    }

    @Test
    public void shouldFlattenWithDelimiterTransformRecordForUpdateEvent() throws InterruptedException {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(FLATTEN_STRUCT, "true");
        props.put(DELIMITER, "-");
        props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(props);

        ObjectId objId = new ObjectId();
        Document obj = new Document()
                .append("_id", objId)
                .append("name", "Sally")
                .append("address", new Document()
                        .append("street", "Morris Park Ave")
                        .append("zipcode", "10462"));

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // update
        Document updateObj = new Document()
                .append("$set", new Document(Collect.hashMapOf(
                        "address.city", "Canberra",
                        "address.name", "James",
                        "address.city2.part", 3)));

        // update
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
        }

        records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // Perform transformation
        final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));

        Struct key = (Struct) transformed.key();
        Struct value = (Struct) transformed.value();

        // then assert key and its schema
        assertThat(key.schema()).isSameAs(transformed.keySchema());
        assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(key.get("id")).isEqualTo(objId.toString());

        // and then assert value and its schema
        assertThat(value.schema()).isSameAs(transformed.valueSchema());
        assertThat(value.get("_id")).isEqualTo(objId.toString());
        assertThat(value.get("address-city")).isEqualTo("Canberra");
        assertThat(value.get("address-name")).isEqualTo("James");
        assertThat(value.get("address-city2-part")).isEqualTo(3);

        assertThat(value.schema().field("_id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().field("address-city").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().field("address-name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().field("address-city2-part").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
        assertThat(value.schema().fields()).hasSize(7);
    }

    @Test
    @FixFor("DBZ-1791")
    public void testAddHeader() throws Exception {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(ADD_HEADERS, "op");
        props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(props);

        final SourceRecord createRecord = createCreateRecord();
        final SourceRecord transformed = transformation.apply(createRecord);
        assertThat(transformed.headers()).hasSize(1);
        assertThat(getSourceRecordHeaderByKey(transformed, "__op")).isEqualTo(Envelope.Operation.CREATE.code());
    }

    @Test
    @FixFor("DBZ-1791")
    public void testAddHeadersForMissingOrInvalidFields() throws Exception {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(ADD_HEADERS, "op,id");
        props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(props);

        final SourceRecord createRecord = createCreateRecord();
        final SourceRecord transformed = transformation.apply(createRecord);
        assertThat(transformed.headers()).hasSize(2);
        assertThat(getSourceRecordHeaderByKey(transformed, "__op")).isEqualTo(Envelope.Operation.CREATE.code());
        assertThat(getSourceRecordHeaderByKey(transformed, "__id")).isNull();
    }

    @Test
    @FixFor({ "DBZ-1791", "DBZ-2504" })
    public void testAddHeadersSpecifyingStruct() throws Exception {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(ADD_HEADERS, "op,source.rs,source.collection");
        props.put(ADD_HEADERS_PREFIX, "prefix.");
        props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(props);

        final SourceRecord createRecord = createCreateRecord();
        final SourceRecord transformed = transformation.apply(createRecord);
        assertThat(transformed.headers()).hasSize(2);
        assertThat(getSourceRecordHeaderByKey(transformed, "prefix.op")).isEqualTo(Envelope.Operation.CREATE.code());
        assertThat(getSourceRecordHeaderByKey(transformed, "prefix.source_collection")).isEqualTo(getCollectionName());
    }

    @Test
    @FixFor("DBZ-1791")
    public void testAddField() throws Exception {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(ADD_FIELDS, "op");
        props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(props);

        final SourceRecord createRecord = createCreateRecord();
        final SourceRecord transformed = transformation.apply(createRecord);
        assertThat(((Struct) transformed.value()).get("__op")).isEqualTo(Envelope.Operation.CREATE.code());
    }

    @Test
    @FixFor({ "DBZ-2606", "DBZ-6773" })
    public void testNewFieldAndHeaderMapping() throws Exception {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        String fieldPrefix = "";
        String headerPrefix = "prefix.";
        props.put(ADD_FIELDS, "op:OP");
        props.put(ADD_FIELDS_PREFIX, fieldPrefix);
        props.put(ADD_HEADERS, "op:OPERATION");
        props.put(ADD_HEADERS_PREFIX, headerPrefix);
        props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(props);

        final SourceRecord createRecord = createCreateRecord();
        final SourceRecord transformed = transformation.apply(createRecord);

        assertThat(((Struct) transformed.value()).get(fieldPrefix + "OP")).isEqualTo(Envelope.Operation.CREATE.code());
        assertThat(transformed.headers()).hasSize(1);
        assertThat(getSourceRecordHeaderByKey(transformed, headerPrefix + "OPERATION")).isEqualTo(Envelope.Operation.CREATE.code());
    }

    @Test
    @FixFor({ "DBZ-1791", "DBZ-2504" })
    public void testAddFields() throws Exception {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(ADD_FIELDS, "op , ts_ms");
        props.put(ADD_FIELDS_PREFIX, "prefix.");
        props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(props);

        final SourceRecord createRecord = createCreateRecord();
        final SourceRecord transformed = transformation.apply(createRecord);
        assertThat(((Struct) transformed.value()).get("prefix.op")).isEqualTo(Envelope.Operation.CREATE.code());
        assertThat(((Struct) transformed.value()).get("prefix.ts_ms")).isNotNull();
    }

    @Test
    @FixFor("DBZ-1791")
    public void testAddFieldsForMissingOptionalField() throws Exception {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(ADD_FIELDS, "op,id");
        props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(props);

        final SourceRecord createRecord = createCreateRecord();
        final SourceRecord transformed = transformation.apply(createRecord);
        assertThat(((Struct) transformed.value()).get("__op")).isEqualTo(Envelope.Operation.CREATE.code());
        assertThat(((Struct) transformed.value()).get("__id")).isNull();
    }

    @Test
    @FixFor("DBZ-1791")
    public void testAddFieldsSpecifyStruct() throws Exception {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(ADD_FIELDS, "op,source.collection");
        props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(props);

        final SourceRecord createRecord = createCreateRecord();
        final SourceRecord transformed = transformation.apply(createRecord);
        assertThat(((Struct) transformed.value()).get("__op")).isEqualTo(Envelope.Operation.CREATE.code());
        assertThat(((Struct) transformed.value()).get("__source_collection")).isEqualTo(getCollectionName());
    }

    @Test
    @FixFor("DBZ-1791")
    public void testAddFieldHandleDeleteRewrite() throws Exception {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(HANDLE_TOMBSTONE_DELETES, "rewrite");
        props.put(ADD_FIELDS, "op");
        transformation.configure(props);

        final SourceRecord deleteRecord = createDeleteRecordWithTombstone().allRecordsInOrder().get(0);
        final SourceRecord transformed = transformation.apply(deleteRecord);
        assertThat(((Struct) transformed.value()).get("__deleted")).isEqualTo(true);
        assertThat(((Struct) transformed.value()).get("__op")).isEqualTo(Envelope.Operation.DELETE.code());
    }

    @Test
    @FixFor("DBZ-1791")
    public void tesAddFieldsHandleDeleteRewrite() throws Exception {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(HANDLE_TOMBSTONE_DELETES, "rewrite");
        props.put(ADD_FIELDS, "op,ts_ms");
        transformation.configure(props);

        final SourceRecord deleteRecord = createDeleteRecordWithTombstone().allRecordsInOrder().get(0);
        final SourceRecord transformed = transformation.apply(deleteRecord);
        assertThat(((Struct) transformed.value()).get("__deleted")).isEqualTo(true);
        assertThat(((Struct) transformed.value()).get("__op")).isEqualTo(Envelope.Operation.DELETE.code());
        assertThat(((Struct) transformed.value()).get("__ts_ms")).isNotNull();
    }

    @Test
    @FixFor("DBZ-1791")
    public void testAddFieldsSpecifyStructHandleDeleteRewrite() throws Exception {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(HANDLE_TOMBSTONE_DELETES, "rewrite");
        props.put(ADD_FIELDS, "op,source.collection");
        transformation.configure(props);

        final SourceRecord deleteRecord = createDeleteRecordWithTombstone().allRecordsInOrder().get(0);
        final SourceRecord transformed = transformation.apply(deleteRecord);
        assertThat(((Struct) transformed.value()).get("__deleted")).isEqualTo(true);
        assertThat(((Struct) transformed.value()).get("__op")).isEqualTo(Envelope.Operation.DELETE.code());
        assertThat(((Struct) transformed.value()).get("__source_collection")).isEqualTo(getCollectionName());
    }

    @Test
    @FixFor("DBZ-1791")
    public void testAddFieldsHandleDeleteRewriteAndTombstone() throws Exception {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(HANDLE_TOMBSTONE_DELETES, "rewrite-with-tombstone");
        props.put(ADD_FIELDS, "op,ts_ms");
        transformation.configure(props);

        final SourceRecords records = createDeleteRecordWithTombstone();

        final SourceRecord deleteRecord = records.allRecordsInOrder().get(0);
        final SourceRecord deleteTransformed = transformation.apply(deleteRecord);
        assertThat(((Struct) deleteTransformed.value()).get("__deleted")).isEqualTo(true);
        assertThat(((Struct) deleteTransformed.value()).get("__op")).isEqualTo(Envelope.Operation.DELETE.code());
        assertThat(((Struct) deleteTransformed.value()).get("__ts_ms")).isNotNull();

        final SourceRecord tombstoneRecord = records.allRecordsInOrder().get(1);
        final SourceRecord tombstoneTransformed = transformation.apply(tombstoneRecord);
        assertThat(tombstoneTransformed.value()).isNull();
    }

    @Test
    @FixFor("DBZ-2585")
    public void testEmptyArray() throws InterruptedException, IOException {
        final Map<String, String> transformationConfig = new HashMap<>();
        transformationConfig.put("array.encoding", "array");
        transformationConfig.put("field.name.adjustment.mode", "avro");
        transformationConfig.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(transformationConfig);

        // Test insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .insertOne(Document.parse("{'empty_array': [] }"));
        }
        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);

        final SourceRecord insertRecord = records.recordsForTopic(this.topicName()).get(0);
        final SourceRecord transformedInsert = transformation.apply(insertRecord);

        assertThat(transformedInsert.valueSchema().field("empty_array")).isNull();
        VerifyRecord.isValid(transformedInsert);
    }

    @Test
    @FixFor("DBZ-2455")
    public void testAddUpdatedFieldAfterUpdate() throws Exception {
        waitForStreamingRunning();

        ObjectId objId = new ObjectId();
        Document obj = new Document()
                .append("_id", objId)
                .append("a", 1)
                .append("b", 2)
                .append("c", 3);

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // update
        Document updateObj = new Document().append("$set", new Document(Collect.hashMapOf("a", 22)));

        // update
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
        }

        records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        final Map<String, String> props = new HashMap<>();
        props.put(ADD_FIELDS, "updateDescription.updatedFields");
        props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(props);

        // Perform transformation
        final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));

        Struct key = (Struct) transformed.key();
        Struct value = (Struct) transformed.value();

        // then assert key and its schema
        assertThat(key.schema()).isSameAs(transformed.keySchema());
        assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(key.get("id")).isEqualTo(objId.toString());

        // and then assert value and its schema
        assertThat(value.schema()).isSameAs(transformed.valueSchema());
        assertThat(value.get("_id")).isEqualTo(objId.toString());
        assertThat(value.get("a")).isEqualTo(22);

        assertThat(value.schema().field("_id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().field("a").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);

        // 4 data fields + 1 __updateDescription_updatedFields
        assertThat(value.schema().fields()).hasSize(4 + 1);

        assertThat(value.schema().field("__updateDescription_updatedFields").schema()).isEqualTo(io.debezium.data.Json.builder().optional().build());
        assertThat(value.get("__updateDescription_updatedFields")).isEqualTo("{\"a\": 22}");

        assertThat(value.get("b")).isEqualTo(2);
        assertThat(value.schema().field("b").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);

        assertThat(value.get("c")).isEqualTo(3);
        assertThat(value.schema().field("c").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);

    }

    @Test(expected = DataException.class)
    @FixFor("DBZ-2316")
    public void testShouldThrowExceptionWithElementsDifferingStructures() throws Exception {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(ARRAY_ENCODING, "array");
        props.put(ADD_FIELDS, "op,source.ts_ms");
        props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(props);

        final SourceRecords records = createCreateRecordFromJson("dbz-2316.json");
        for (SourceRecord record : records.allRecordsInOrder()) {
            transformation.apply(record);
        }
    }

    @Test
    @FixFor("DBZ-2569")
    public void testMatrixType() throws InterruptedException, IOException {
        final Map<String, String> transformationConfig = new HashMap<>();
        transformationConfig.put("array.encoding", "array");
        transformationConfig.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(transformationConfig);

        // Test insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .insertOne(Document.parse(
                            "{"
                                    + "  'matrix': ["
                                    + "    [1,2,3],"
                                    + "    [4,5,6],"
                                    + "    [7,8,9],"
                                    + "  ]"
                                    + "  ,'array_complex': ["
                                    + "    {'k1' : 'v1','k2' : 1},"
                                    + "    {'k1' : 'v2','k2' : 2},"
                                    + "  ]"
                                    + "  ,'matrix_complex': ["
                                    + "    ["
                                    + "      {'k3' : 'v111',"
                                    + "       'k4' : [1,2,3]},"
                                    + "      {'k3' : 'v211',"
                                    + "       'k4' : [4,5,6]}"
                                    + "    ],"
                                    + "    ["
                                    + "      {'k3' : 'v112',"
                                    + "       'k4' : [7,8]},"
                                    + "      {'k3' : 'v212',"
                                    + "       'k4' : [8]}"
                                    + "    ],"
                                    + "  ]"
                                    + "}"));
        }
        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);

        final SourceRecord insertRecord = records.recordsForTopic(this.topicName()).get(0);
        final SourceRecord transformedInsert = transformation.apply(insertRecord);
        final Struct transformedInsertValue = (Struct) transformedInsert.value();

        final Schema matrixSchema = transformedInsert.valueSchema().field("matrix").schema();
        assertThat(matrixSchema.type()).isEqualTo(Schema.Type.ARRAY);
        final Schema subMatrixSchema = matrixSchema.valueSchema().schema();
        assertThat(subMatrixSchema.type()).isEqualTo(Schema.Type.ARRAY);
        assertThat(subMatrixSchema.valueSchema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        assertThat(transformedInsertValue.get("matrix")).isEqualTo(Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9)));

        final Schema arrayComplexSchema = transformedInsert.valueSchema().field("array_complex").schema();
        assertThat(arrayComplexSchema.type()).isEqualTo(Schema.Type.ARRAY);
        final Schema subArrayComplexSchema = arrayComplexSchema.valueSchema().schema();
        assertThat(subArrayComplexSchema.type()).isEqualTo(Schema.Type.STRUCT);
        assertThat(subArrayComplexSchema.field("k1").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        assertThat(subArrayComplexSchema.field("k2").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        final Field k1 = subArrayComplexSchema.field("k1");
        final Field k2 = subArrayComplexSchema.field("k2");
        final Struct subStruct1 = new Struct(subArrayComplexSchema);
        subStruct1.put(k1, "v1");
        subStruct1.put(k2, 1);
        final Struct subStruct2 = new Struct(subArrayComplexSchema);
        subStruct2.put(k1, "v2");
        subStruct2.put(k2, 2);
        assertThat(transformedInsertValue.get("array_complex")).isEqualTo(Arrays.asList(subStruct1, subStruct2));

        final Schema matrixComplexSchema = transformedInsert.valueSchema().field("matrix_complex").schema();
        assertThat(matrixComplexSchema.type()).isEqualTo(Schema.Type.ARRAY);
        final Schema subMatrixComplexSchema = matrixComplexSchema.valueSchema().schema();
        assertThat(subMatrixComplexSchema.type()).isEqualTo(Schema.Type.ARRAY);
        Schema strucSchema = subMatrixComplexSchema.valueSchema();
        assertThat(strucSchema.schema().type()).isEqualTo(Schema.Type.STRUCT);
        assertThat(strucSchema.field("k3").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        assertThat(strucSchema.field("k4").schema().type()).isEqualTo(Schema.Type.ARRAY);
        assertThat(strucSchema.field("k4").schema().valueSchema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        final Field k3 = strucSchema.field("k3");
        final Field k4 = strucSchema.field("k4");
        final Struct subStruct11 = new Struct(strucSchema.schema());
        subStruct11.put(k3, "v111");
        subStruct11.put(k4, Arrays.asList(1, 2, 3));
        final Struct subStruct12 = new Struct(strucSchema.schema());
        subStruct12.put(k3, "v112");
        subStruct12.put(k4, Arrays.asList(7, 8));
        final Struct subStruct21 = new Struct(strucSchema.schema());
        subStruct21.put(k3, "v211");
        subStruct21.put(k4, Arrays.asList(4, 5, 6));
        final Struct subStruct22 = new Struct(strucSchema.schema());
        subStruct22.put(k3, "v212");
        subStruct22.put(k4, Arrays.asList(8));
        assertThat(transformedInsertValue.get("matrix_complex"))
                .isEqualTo(Arrays.asList(Arrays.asList(subStruct11, subStruct21), Arrays.asList(subStruct12, subStruct22)));
    }

    @Test
    @FixFor("DBZ-2569")
    public void testMatrixArrayAsDocumentType() throws InterruptedException, IOException {
        final Map<String, String> transformationConfig = new HashMap<>();
        transformationConfig.put("array.encoding", "document");
        transformationConfig.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(transformationConfig);

        // Test insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .insertOne(Document.parse(
                            "{"
                                    + "  'matrix': ["
                                    + "    [1,'aa',3],"
                                    + "    [4,5,'6'],"
                                    + "    [7.0,8],"
                                    + "  ]"
                                    + "}"));
        }
        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);

        final SourceRecord insertRecord = records.recordsForTopic(this.topicName()).get(0);
        final SourceRecord transformedInsert = transformation.apply(insertRecord);

        final Schema matrixSchema = transformedInsert.valueSchema().field("matrix").schema();
        assertThat(matrixSchema.type()).isEqualTo(Schema.Type.STRUCT);
        assertThat(matrixSchema.fields().size()).isEqualTo(3);
        final Schema firstSubSchema = matrixSchema.field("_0").schema();
        assertThat(firstSubSchema.type()).isEqualTo(Schema.Type.STRUCT);
        assertThat(firstSubSchema.fields().size()).isEqualTo(3);
        assertThat(firstSubSchema.field("_0").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        assertThat(firstSubSchema.field("_1").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        assertThat(firstSubSchema.field("_2").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        final Schema secondSubSchema = matrixSchema.field("_1").schema();
        assertThat(secondSubSchema.type()).isEqualTo(Schema.Type.STRUCT);
        assertThat(secondSubSchema.fields().size()).isEqualTo(3);
        assertThat(secondSubSchema.field("_0").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        assertThat(secondSubSchema.field("_1").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        assertThat(secondSubSchema.field("_2").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        final Schema thirdSubSchema = matrixSchema.field("_2").schema();
        assertThat(thirdSubSchema.type()).isEqualTo(Schema.Type.STRUCT);
        assertThat(thirdSubSchema.fields().size()).isEqualTo(2);
        assertThat(thirdSubSchema.field("_0").schema()).isEqualTo(Schema.OPTIONAL_FLOAT64_SCHEMA);
        assertThat(thirdSubSchema.field("_1").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        final Struct transformedInsertValue = (Struct) transformedInsert.value();
        final Struct firstSubStruct = new Struct(firstSubSchema);
        firstSubStruct.put(firstSubSchema.field("_0"), 1);
        firstSubStruct.put(firstSubSchema.field("_1"), "aa");
        firstSubStruct.put(firstSubSchema.field("_2"), 3);
        final Struct secondSubStruct = new Struct(secondSubSchema);
        secondSubStruct.put(secondSubSchema.field("_0"), 4);
        secondSubStruct.put(secondSubSchema.field("_1"), 5);
        secondSubStruct.put(secondSubSchema.field("_2"), "6");
        final Struct thirdSubStruct = new Struct(thirdSubSchema);
        thirdSubStruct.put(thirdSubSchema.field("_0"), 7.0);
        thirdSubStruct.put(thirdSubSchema.field("_1"), 8);
        final Struct struct = new Struct(matrixSchema);
        struct.put(matrixSchema.field("_0"), firstSubStruct);
        struct.put(matrixSchema.field("_1"), secondSubStruct);
        struct.put(matrixSchema.field("_2"), thirdSubStruct);
        assertThat(transformedInsertValue.get("matrix")).isEqualTo(struct);
    }

    @Test
    @FixFor("DBZ-5434")
    public void shouldSupportNestedArrays() throws InterruptedException {
        waitForStreamingRunning();

        transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "tombstone"));

        // Test insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .insertOne(Document.parse("{\"_id\":ObjectId(\"6182b1a25711ed59dd6a1d6c\"),\"f1\":{\"f2\":[{\"f3\":{}},{\"f3\":{\"f5\":5}}]}}"));
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);

        SourceRecord insertRecord = records.recordsForTopic(this.topicName()).get(0);
        SourceRecord transformedInsert = transformation.apply(insertRecord);
        Struct transformedInsertValue = (Struct) transformedInsert.value();
        Schema transformedInsertSchema = transformedInsert.valueSchema();
        transformedInsertSchema.field("f1").schema().field("f2");
        assertThat(transformedInsertSchema.field("f1").schema()
                .field("f2").schema().valueSchema()
                .field("f3").schema()
                .field("f5").schema().type()).isEqualTo(Schema.INT32_SCHEMA.type());
        assertThat(transformedInsertValue.getStruct("f1").getArray("f2").size()).isEqualTo(2);

        // Test delete
        try (var client = connect()) {
            client.getDatabase(DB_NAME)
                    .getCollection(this.getCollectionName())
                    .deleteOne(RawBsonDocument.parse("{'_id' : ObjectId('6182b1a25711ed59dd6a1d6c')}"));
        }

        records = consumeRecordsByTopic(2);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(2);

        // Test insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .insertMany(Collect.arrayListOf(
                            "{\"_id\":ObjectId(\"6182b1a25711ed59dd6a1d6c\"),\"f1\":{\"f2\":[{\"f3\":[]},{\"f3\":[{\"f5\":5}]}]}}",
                            "{\"_id\":ObjectId(\"6182b1a25711ed59dd6a1d6d\"),\"f1\":{\"f2\":[{\"f3\":[]},{\"f3\":[]}]}}")
                            .stream().map(Document::parse).collect(Collectors.toList()));
        }

        records = consumeRecordsByTopic(2);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(2);

        List<SourceRecord> transformedInserts = records.allRecordsInOrder().stream().map(m -> transformation.apply(m))
                .collect(Collectors.toList());
        transformedInsertValue = (Struct) transformedInserts.get(0).value();
        assertThat(transformedInsertValue.getStruct("f1").getArray("f2").size()).isEqualTo(2);

        transformedInsertValue = (Struct) transformedInserts.get(1).value();
        List<Struct> f2 = transformedInsertValue.getStruct("f1").getArray("f2");
        assertThat(f2.size()).isEqualTo(2);
        assertThat(f2.get(0).getArray("f3").size()).isEqualTo(0);
    }

    @Test
    @FixFor({ "DBZ-5834", "DBZ-6774" })
    public void shouldAddUpdateDescription() throws Exception {
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(ADD_HEADERS, "updateDescription.updatedFields,nonexistentField,version");
        props.put(ADD_HEADERS_PREFIX, "prefix.");
        props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
        transformation.configure(props);

        ObjectId objId = new ObjectId();
        Document obj = new Document()
                .append("_id", objId)
                .append("name", "Sally")
                .append("address", new Document()
                        .append("street", "Morris Park Ave")
                        .append("zipcode", "10462"));

        // insert
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // update
        Document updateObj = new Document()
                .append("$set", new Document(Collect.hashMapOf(
                        "name", "Mary",
                        "zipcode", "11111")));

        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
        }

        SourceRecords updateRecords = consumeRecordsByTopic(1);
        assertThat(updateRecords.recordsForTopic(this.topicName()).size()).isEqualTo(1);

        // do the transform
        final SourceRecord transformed = transformation.apply(updateRecords.recordsForTopic(this.topicName()).get(0));

        // verify headers
        final String expectedUpdateFields = "{\"name\": \"Mary\", \"zipcode\": \"11111\"}";
        assertThat(getSourceRecordHeaderByKey(transformed, "prefix.updateDescription_updatedFields")).isEqualTo(expectedUpdateFields);
        assertThat(getSourceRecordHeaderByKey(transformed, "prefix.nonexistentField")).isNull();
        assertThat(getSourceRecordHeaderByKey(transformed, "prefix.version")).isEqualTo(Module.version());
    }

    @Test
    @FixFor("DBZ-6725")
    @SkipWhenDatabaseVersion(check = LESS_THAN, major = 6, reason = "Pre-image support in Change Stream is officially released in Mongo 6.0.")
    public void shouldGenerateRecordForDeleteEventsDeleteHandlingRewrite() throws InterruptedException {
        Configuration config = getBaseConfigBuilder()
                .with(MongoDbConnectorConfig.CAPTURE_MODE, MongoDbConnectorConfig.CaptureMode.CHANGE_STREAMS_WITH_PRE_IMAGE)
                .build();
        restartConnectorWithConfig(config);
        waitForStreamingRunning();

        final Map<String, String> props = new HashMap<>();
        props.put(HANDLE_TOMBSTONE_DELETES, "rewrite");
        transformation.configure(props);

        ObjectId objId = new ObjectId();
        Document obj = new Document()
                .append("_id", objId)
                .append("dataStr", "Hello");

        // insert
        try (var client = connect()) {
            MongoDatabase db1 = client.getDatabase(DB_NAME);
            CreateCollectionOptions options = new CreateCollectionOptions();
            options.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
            db1.createCollection(this.getCollectionName(), options);

            client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
        }

        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();

        // delete
        try (var client = connect()) {
            client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
                    .deleteOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"));
        }

        records = consumeRecordsByTopic(2);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
        assertNoRecordsToConsume();

        // Perform transformation
        final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));

        Struct key = (Struct) transformed.key();
        Struct value = (Struct) transformed.value();

        // then assert key and its schema
        assertThat(key.schema()).isSameAs(transformed.keySchema());
        assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(key.get("id")).isEqualTo(objId.toString());

        // assert value and its schema
        assertThat(value.schema().field("_id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().field("dataStr").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        assertThat(value.schema().field("__deleted").schema()).isEqualTo(SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
        assertThat(value.get("_id")).isEqualTo(objId.toString());
        assertThat(value.get("dataStr")).isEqualTo("Hello");
        assertThat(value.get("__deleted")).isEqualTo(true);
    }

    @Test
    @FixFor("DBZ-6809")
    public void testConnectorAndTransformAvroFieldNameAdjustment() throws InterruptedException, IOException {
        restartConnectorWithConfig(getBaseConfigBuilder()
                .with(CommonConnectorConfig.FIELD_NAME_ADJUSTMENT_MODE, "avro")
                .build());

        final Map<String, String> transformationConfig = new HashMap<>();
        transformationConfig.put("field.name.adjustment.mode", "avro");
        transformation.configure(transformationConfig);

        // Test insert
        var doc = new Document("_id", 0).append("api-version", "2.5");
        try (var client = connect()) {
            client.getDatabase(DB_NAME)
                    .getCollection(this.getCollectionName())
                    .insertOne(doc);
        }
        SourceRecords records = consumeRecordsByTopic(1);
        assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);

        final SourceRecord insertRecord = records.recordsForTopic(this.topicName()).get(0);
        final SourceRecord transformedInsert = transformation.apply(insertRecord);
        VerifyRecord.isValid(transformedInsert);

        final var transformedStruct = (Struct) transformedInsert.value();
        final var transformedFiledNames = transformedStruct.schema().fields().stream().map(Field::name).collect(Collectors.toList());
        assertThat(transformedFiledNames).containsOnly("_id", "api_version");
    }
}
